package com.sea.bei

import com.ververica.cdc.connectors.mysql.MySqlSource
import com.ververica.cdc.connectors.mysql.table.StartupOptions
import com.ververica.cdc.debezium.{DebeziumSourceFunction, StringDebeziumDeserializationSchema}
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.scala._

/**
  * Date  2021/11/3-15:24
  *
  * 注意：
  *
  *     1 运行该程序前需要开启相应mysql库的binlog日志，才能监控某个库的binlog，flinkcdc才能感知
  *
  *         1) 怎样开启：
  *             找到 C:\ProgramData\MySQL\MySQL Server 5.7\my.ini  的mysqld下添加
  *             开启前三个参数就可以，后面可选择
  *             server_id：MySQL5.7及以上版本开启binlog必须要配置这个选项。对于MySQL集群，不同节点的server_id必须不同。对于单实例部署则没有要求。
  *             log_bin：指定binlog文件名和储存位置。如果不指定路径，默认位置为/var/lib/mysql/。
  *             binlog_format：binlog格式。有3个值可以选择：ROW：记录哪条数据被修改和修改之后的数据，会产生大量日志。STATEMENT：记录修改数据的SQL，日志量较小。MIXED：混合使用上述两个模式。CDC要求必须配置为ROW。
  *             expire_logs_days：bin_log过期时间，超过该时间的log会自动删除。
  *             binlog_do_db：binlog记录哪些数据库。如果需要配置多个库，如例子中配置多项。切勿使用逗号分隔。
  *
  *            我的设置：
  *               server_id=1
  *               log_bin = mysql-bin
  *               binlog_format=row
  *               binlog-do-db=flinkcdc
  *
  *         2) 开启后创建相应的表，D:\Software\Mysql\MySQL Server 5.7\Data 会有相应的 mysql-bin.000001 和 mysql-bin.index
  *
  *   2 启动程序 bin/flink run -m yarn-cluster -c com.sea.bei.FlinkCDCAPI jar包
  *             bin/flink   savepoint   jobid   保存数据到hdfs的目录
  *             bin/flink run -m yarn-cluster -s 保存数据到hdfs的目录 -c jar包
  *
  *   3 借助savepoint 或者checkpoint 可以实现断点续传的功能，也就是说当做savepoint后，信息自动停止。其后数据发生变化，如 修改、删除、添加
  *     如果再以之前savepoint或者checkpoint的起点启动程序，flink可以断点续传，即flink启动还会读binlog日志并感知到程序停止之后数据做了怎样的变动
  *     并消费出来
  *
  *   4  startupOptions(StartupOptions.initial())
  *
  *
  *      //initial (default): Performs an initial snapshot on the monitored database tables upon
          first startup, and continue to read the latest binlog.
  
          ---> 读取现有表的数据，并读取binlog的最新数据并做持续跟踪
  
        //latest-offset: Never to perform snapshot on the monitored database tables upon first
            startup, just read from the end of the binlog which means only have the changes since the
            connector was started.
  
          ---> 不会读取现有表的数据，只会读取binlog的最新数据并做持续跟踪
   
        //timestamp: Never to perform snapshot on the monitored database tables upon first
            startup, and directly read binlog from the specified timestamp. The consumer will traverse the
            binlog from the beginning and ignore change events whose timestamp is smaller than the
            specified timestamp.
          
          ---> 不会读取现有表的数据，直接会读取binlog当中比指定timestamp大的binlog数据，并对binlog做持续的跟踪
        
        //specific-offset: Never to perform snapshot on the monitored database tables upon
           first startup, and directly read binlog from the specified offset.
  
        ---> 不会读取现有表的数据，直接会读取binlog当中比指定specific-offset大的binlog数据，并对binlog做持续的跟踪
  
    5 flink-cdc 2.x版本中，API的编程方式 可以是 flink 1.12.x及以上的版本，但是SQL的编程方式 必须是 flink 1.13.x 及以上的版本
  
    
  *
  */
object FlinkCDCAPI {
  
  def main(args: Array[String]): Unit = {
  
    // 1 创建执行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
  
    // 2 开启CK
//    env.enableCheckpointing(5000)
//    env.getCheckpointConfig.setCheckpointTimeout(5000)
//    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
//    env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
//    env.setStateBackend(new FsStateBackend("hdfs://hadoop156:9000/flinkcdc/ck"))
    
    //3.Flink-CDC 将读取 binlog 的位置信息以状态的方式保存在 CK,如果想要做到断点续传,需要从 Checkpoint 或者 Savepoint 启动程序
    val source: DebeziumSourceFunction[String] = MySqlSource.builder()
      .hostname("localhost")
      .port(3306)
      .username("root")
      .password("123456")
      .serverTimeZone("Asia/Shanghai") // 必须加上，要不然不能使用
      .databaseList("flinkcdc")//flinkcdc
      .tableList("flinkcdc.user_info")
      .deserializer(new StringDebeziumDeserializationSchema())
      .startupOptions(StartupOptions.initial())
      .build()
    
    env.addSource(source).print()
    
    env.execute("FlinkCDC")
    
  }
  
}
